29.2.1 LLM 客户端#
LLM 客户端是编程 Agent 与大语言模型交互的核心组件,负责处理 API 调用、响应解析、错误处理等。
基础实现#
class LLMClient:
"""LLM 客户端"""
def init(self, config: LLMConfig):
self.config = config
self.api_key = config.api_key
self.base_url = config.base_url
self.model = config.model
self.max_tokens = config.max_tokens
self.temperature = config.temperature
会话管理
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {self.api_key}',
'Content-Type': 'application/json'
})
缓存
self.cache = LRUCache(maxsize=1000)
统计
self.stats = {
'total_requests': 0,
'cache_hits': 0,
'errors': 0
}
async def complete(self, prompt: str,
context: List[Dict] = None,
**kwargs) -> str:
"""完成文本生成"""
检查缓存
cache_key = self._generate_cache_key(prompt, context, kwargs)
cached_response = self.cache.get(cache_key)
if cached_response:
self.stats['cache_hits'] += 1
return cached_response
构建请求
messages = self._build_messages(prompt, context)
合并参数
params = {
'model': kwargs.get('model', self.model),
'messages': messages,
'max_tokens': kwargs.get('max_tokens', self.max_tokens),
'temperature': kwargs.get('temperature', self.temperature),
**kwargs
}
发送请求
try:
response = await self._send_request(params)
self.stats['total_requests'] += 1
解析响应
result = self._parse_response(response)
缓存结果
self.cache.set(cache_key, result)
return result
except Exception as e:
self.stats['errors'] += 1
logger.error(f"LLM request failed: {e}")
raise
async def _send_request(self, params: Dict) -> Dict:
"""发送请求"""
url = f"{self.base_url}/chat/completions"
使用异步请求
async with aiohttp.ClientSession() as session:
async with session.post(url, json=params) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"API error: {response.status} - {error_text}")
return await response.json()
def _build_messages(self, prompt: str,
context: List[Dict] = None) -> List[Dict]:
"""构建消息列表"""
messages = []
添加系统提示
if self.config.system_prompt:
messages.append({
'role': 'system',
'content': self.config.system_prompt
})
添加上下文
if context:
messages.extend(context)
添加用户提示
messages.append({
'role': 'user',
'content': prompt
})
return messages
def _parse_response(self, response: Dict) -> str:
"""解析响应"""
try:
return response['choices'][0]['message']['content']
except (KeyError, IndexError) as e:
raise Exception(f"Invalid response format: {e}")
def _generate_cache_key(self, prompt: str,
context: List[Dict],
kwargs: Dict) -> str:
"""生成缓存键"""
key_data = {
'prompt': prompt,
'context': context,
'kwargs': kwargs
}
return hashlib.md5(
json.dumps(key_data, sort_keys=True).encode()
).hexdigest()
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
return self.stats.copy()
流式响应支持#
工具管理器实现#
python
class ToolManager:
"""工具管理器"""
def __init__(self):
self.tools: Dict[str, Tool] = {}
self.tool_categories: Dict[str, List[str]] = {}
self.execution_history: List[ToolExecution] = []
def register_tool(self, tool: Tool, category: str = None):
"""注册工具"""
tool_id = tool.id
if tool_id in self.tools:
logger.warning(f"Tool already registered: {tool_id}")
return
self.tools[tool_id] = tool
if category:
if category not in self.tool_categories:
self.tool_categories[category] = []
self.tool_categories[category].append(tool_id)
logger.info(f"Tool registered: {tool_id}")
async def execute_tool(self, tool_id: str,
parameters: Dict[str, Any]) -> ToolResult:
"""执行工具"""
tool = self.tools.get(tool_id)
if not tool:
raise ValueError(f"Tool not found: {tool_id}")
# 验证参数
if not tool.validate_parameters(parameters):
raise ValueError("Invalid parameters")
# 记录执行开始
execution = ToolExecution(
tool_id=tool_id,
parameters=parameters,
started_at=datetime.utcnow()
)
try:
# 执行工具
result = await tool.execute(parameters)
# 记录执行结果
execution.completed_at = datetime.utcnow()
execution.success = True
execution.result = result
self.execution_history.append(execution)
return result
except Exception as e:
# 记录执行失败
execution.completed_at = datetime.utcnow()
execution.success = False
execution.error = str(e)
self.execution_history.append(execution)
logger.error(f"Tool execution failed: {e}")
raise
def get_tool(self, tool_id: str) -> Tool:
"""获取工具"""
return self.tools.get(tool_id)
def list_tools(self, category: str = None) -> List[Tool]:
"""列出工具"""
if category:
tool_ids = self.tool_categories.get(category, [])
return [self.tools[tid] for tid in tool_ids]
return list(self.tools.values())
def get_tool_schema(self, tool_id: str) -> Dict[str, Any]:
"""获取工具模式"""
tool = self.get_tool(tool_id)
if tool:
return tool.get_schema()
return None
def get_execution_history(self, tool_id: str = None,
limit: int = 100) -> List[ToolExecution]:
"""获取执行历史"""
history = self.execution_history
if tool_id:
history = [e for e in history if e.tool_id == tool_id]
return history[-limit:]
### 示例工具实现
class FileReadTool(Tool):
"""文件读取工具"""
def __init__(self):
super().__init__(
tool_id="file_read",
name="File Read",
description="Read the contents of a file"
)
async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
"""执行文件读取"""
file_path = parameters['file_path']
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return ToolResult(
success=True,
data={
'content': content,
'file_path': file_path
},
message=f"Successfully read file: {file_path}"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
message=f"Failed to read file: {file_path}"
)
def get_schema(self) -> Dict[str, Any]:
"""获取参数模式"""
return {
'type': 'object',
'properties': {
'file_path': {
'type': 'string',
'description': 'Path to the file to read'
}
},
'required': ['file_path']
}
class CodeExecuteTool(Tool):
"""代码执行工具"""
def __init__(self):
super().__init__(
tool_id="code_execute",
name="Code Execute",
description="Execute code and return the output"
)
async def execute(self, parameters: Dict[str, Any]) -> ToolResult:
"""执行代码"""
code = parameters['code']
language = parameters.get('language', 'python')
try:
if language == 'python':
result = await self._execute_python(code)
else:
raise ValueError(f"Unsupported language: {language}")
return ToolResult(
success=True,
data={
'output': result['output'],
'error': result.get('error')
},
message="Code executed successfully"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e),
message="Code execution failed"
)
async def _execute_python(self, code: str) -> Dict[str, Any]:
"""执行 Python 代码"""
# 使用 subprocess 执行
process = await asyncio.create_subprocess_exec(
'python3',
'-c',
code,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return {
'output': stdout.decode('utf-8'),
'error': stderr.decode('utf-8') if stderr else None
}
def get_schema(self) -> Dict[str, Any]:
"""获取参数模式"""
return {
'type': 'object',
'properties': {
'code': {
'type': 'string',
'description': 'Code to execute'
},
'language': {
'type': 'string',
'description': 'Programming language',
'default': 'python'
}
},
'required': ['code']
}
29.2.3 记忆系统#
记忆系统负责存储和管理 Agent 的知识、经验和交互历史。
记忆系统架构#
29.2.4 任务规划器#
任务规划器负责将用户请求分解为可执行的任务序列。
通过实现这些核心组件,我们可以构建一个功能完整的编程 Agent 基础框架。